Skip to content

21 asyncio异步编程

你要同时请求10个API接口,用同步代码得一个一个等,总共要等10倍时间。用asyncio可以让它们"同时"发起请求,哪个先回来就先处理哪个,总时间只取决于最慢的那个请求。

asyncio是Python的异步I/O框架,用协程(coroutine)在单线程里实现并发。它不是多线程,也不是多进程,而是通过await在等待I/O时切换到其他任务,充分利用等待时间。

一、协程基础

1.1 async/await

python
import asyncio

# 用async def定义协程函数
async def hello():
    print("Hello")
    await asyncio.sleep(1)  # 等待时可以切换到其他任务
    print("World")

# 运行协程
asyncio.run(hello())

async def定义的是协程函数,调用它不会立即执行,而是返回一个协程对象。必须用awaitasyncio.run()来执行。

1.2 await

await用于等待一个可等待对象(协程、Task、Future)完成。

python
import asyncio

async def fetch_data():
    print("开始获取数据...")
    await asyncio.sleep(2)  # 模拟网络请求
    print("数据获取完成")
    return {"name": "大志", "age": 28}

async def main():
    # await会等待fetch_data完成
    result = await fetch_data()
    print(result)

asyncio.run(main())

二、任务与并发

2.1 create_task()

创建任务,让多个协程并发执行。

python
import asyncio

async def fetch(url, delay):
    print(f"开始请求 {url}")
    await asyncio.sleep(delay)
    print(f"完成请求 {url}")
    return f"{url} 的数据"

async def main():
    # 创建任务(立即开始执行)
    task1 = asyncio.create_task(fetch("url1", 2))
    task2 = asyncio.create_task(fetch("url2", 1))
    task3 = asyncio.create_task(fetch("url3", 3))

    # 等待所有任务完成
    result1 = await task1
    result2 = await task2
    result3 = await task3

    print(f"结果: {result1}, {result2}, {result3}")

asyncio.run(main())
# 总共只需3秒(取决于最慢的任务),而不是6秒

2.2 gather()

更简洁的并发方式。

python
import asyncio

async def fetch(url, delay):
    await asyncio.sleep(delay)
    return f"{url} 的数据"

async def main():
    # gather并发运行多个协程
    results = await asyncio.gather(
        fetch("url1", 2),
        fetch("url2", 1),
        fetch("url3", 3)
    )
    print(results)
    # ['url1 的数据', 'url2 的数据', 'url3 的数据']

asyncio.run(main())

2.3 gather()的异常处理

python
import asyncio

async def risky_task():
    await asyncio.sleep(1)
    raise ValueError("出错了")

async def safe_task():
    await asyncio.sleep(1)
    return "成功"

async def main():
    # return_exceptions=True:异常作为结果返回,不会中断其他任务
    results = await asyncio.gather(
        risky_task(),
        safe_task(),
        return_exceptions=True
    )
    print(results)
    # [ValueError('出错了'), '成功']

asyncio.run(main())

三、等待控制

3.1 wait()

更灵活的等待方式。

python
import asyncio

async def fetch(url, delay):
    await asyncio.sleep(delay)
    return f"{url} 完成"

async def main():
    tasks = [
        asyncio.create_task(fetch("url1", 2)),
        asyncio.create_task(fetch("url2", 1)),
        asyncio.create_task(fetch("url3", 3)),
    ]

    # FIRST_COMPLETED:有一个完成就返回
    done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
    print(f"已完成: {len(done)}, 未完成: {len(pending)}")

    for task in done:
        print(f"结果: {task.result()}")

asyncio.run(main())

return_when参数:

  • FIRST_COMPLETED:有一个任务完成就返回
  • FIRST_EXCEPTION:有一个任务抛异常就返回
  • ALL_COMPLETED:所有任务完成才返回(默认)

3.2 as_completed()

按完成顺序获取结果。

python
import asyncio

async def fetch(url, delay):
    await asyncio.sleep(delay)
    return f"{url} 完成"

async def main():
    tasks = [
        asyncio.create_task(fetch("url1", 3)),
        asyncio.create_task(fetch("url2", 1)),
        asyncio.create_task(fetch("url3", 2)),
    ]

    # 按完成顺序获取结果
    for task in asyncio.as_completed(tasks):
        result = await task
        print(result)

asyncio.run(main())
# url2 完成
# url3 完成
# url1 完成

四、异步同步原语

4.1 Lock

异步锁,保护共享资源。

python
import asyncio

shared_resource = 0
lock = asyncio.Lock()

async def increment():
    global shared_resource
    async with lock:
        current = shared_resource
        await asyncio.sleep(0.1)  # 模拟处理
        shared_resource = current + 1

async def main():
    await asyncio.gather(*[increment() for _ in range(10)])
    print(f"结果: {shared_resource}")  # 10

asyncio.run(main())

4.2 Event

异步事件,用于任务间通知。

python
import asyncio

event = asyncio.Event()

async def waiter():
    print("等待事件...")
    await event.wait()
    print("事件触发了!")

async def setter():
    await asyncio.sleep(2)
    event.set()  # 触发事件

async def main():
    await asyncio.gather(waiter(), setter())

asyncio.run(main())

4.3 Semaphore

信号量,限制并发数量。

python
import asyncio

semaphore = asyncio.Semaphore(3)  # 最多3个并发

async def limited_task(id):
    async with semaphore:
        print(f"任务 {id} 开始")
        await asyncio.sleep(2)
        print(f"任务 {id} 完成")

async def main():
    await asyncio.gather(*[limited_task(i) for i in range(10)])

asyncio.run(main())
# 同时只有3个任务在执行

4.4 Queue

异步队列,用于生产者-消费者模式。

python
import asyncio

async def producer(queue):
    for i in range(5):
        await asyncio.sleep(1)
        await queue.put(f"数据 {i}")
        print(f"生产: 数据 {i}")

async def consumer(queue):
    while True:
        item = await queue.get()
        print(f"消费: {item}")
        queue.task_done()

async def main():
    queue = asyncio.Queue()

    # 启动生产者和消费者
    await asyncio.gather(
        producer(queue),
        consumer(queue),
        return_exceptions=True
    )

asyncio.run(main())

五、实战场景

5.1 批量请求API

python
import asyncio
import aiohttp  # 需要安装: pip install aiohttp

async def fetch_url(session, url):
    async with session.get(url) as response:
        return await response.json()

async def main():
    urls = [
        "https://api.example.com/data1",
        "https://api.example.com/data2",
        "https://api.example.com/data3",
    ]

    async with aiohttp.ClientSession() as session:
        tasks = [fetch_url(session, url) for url in urls]
        results = await asyncio.gather(*tasks)
        print(results)

asyncio.run(main())

5.2 超时控制

python
import asyncio

async def slow_task():
    await asyncio.sleep(10)
    return "完成"

async def main():
    try:
        # 设置超时时间
        result = await asyncio.wait_for(slow_task(), timeout=3)
        print(result)
    except asyncio.TimeoutError:
        print("任务超时!")

asyncio.run(main())

5.3 定时任务

python
import asyncio

async def periodic_task():
    while True:
        print("执行定时任务...")
        await asyncio.sleep(5)  # 每5秒执行一次

async def main():
    task = asyncio.create_task(periodic_task())

    # 运行30秒后取消
    await asyncio.sleep(30)
    task.cancel()

    try:
        await task
    except asyncio.CancelledError:
        print("任务已取消")

asyncio.run(main())

六、常见错误

6.1 忘记await

python
# 错误:忘记await
async def main():
    task = asyncio.create_task(some_coroutine())
    result = task  # 这是Task对象,不是结果

# 正确
async def main():
    task = asyncio.create_task(some_coroutine())
    result = await task  # 这才是结果

6.2 在同步代码中调用协程

python
# 错误:在同步函数中直接调用协程
def sync_function():
    result = await some_coroutine()  # SyntaxError

# 正确:用asyncio.run()
def sync_function():
    result = asyncio.run(some_coroutine())

七、总结

asyncio的核心:

组件用途
async def定义协程函数
await等待协程完成
asyncio.run()运行主协程
asyncio.create_task()创建任务
asyncio.gather()并发运行多个协程
asyncio.wait()灵活等待任务
asyncio.Lock/Event/Semaphore异步同步原语
asyncio.Queue异步队列

使用场景:

  • 网络请求(配合aiohttp)
  • 数据库查询(配合asyncpg等)
  • 文件I/O
  • 任何需要"等待"的操作

记住:asyncio适合I/O密集型任务,CPU密集型任务用multiprocessing